/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.solr.gcs;

import static java.net.HttpURLConnection.HTTP_PRECON_FAILED;

import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.ReadChannel;
import com.google.cloud.WriteChannel;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageException;
import com.google.cloud.storage.StorageOptions;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.lang.invoke.MethodHandles;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.nio.file.FileAlreadyExistsException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.store.BufferedIndexInput;
import org.apache.lucene.store.ChecksumIndexInput;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.DirectoryFactory;
import org.apache.solr.core.backup.repository.AbstractBackupRepository;
import org.apache.solr.core.backup.repository.BackupRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** {@link BackupRepository} implementation that stores files in Google Cloud Storage ("GCS"). */
public class GCSBackupRepository extends AbstractBackupRepository {
  private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
  private static final int LARGE_BLOB_THRESHOLD_BYTE_SIZE = 5 * 1024 * 1024;
  private static final Storage.BlobWriteOption[] NO_WRITE_OPTIONS = new Storage.BlobWriteOption[0];

  protected Storage storage;

  protected String bucketName = null;
  protected String credentialPath = null;
  protected int writeBufferSizeBytes;
  protected int readBufferSizeBytes;
  protected StorageOptions.Builder storageOptionsBuilder = null;

  protected Storage initStorage() {
    if (storage != null) return storage;

    try {
      if (credentialPath != null) {
        log.info("Creating GCS client using credential at {}", credentialPath);
        // 'GoogleCredentials.fromStream' closes the input stream, so we don't
        GoogleCredentials credential =
            GoogleCredentials.fromStream(new FileInputStream(credentialPath));
        storageOptionsBuilder.setCredentials(credential);
      } else {
        // nowarn compile time string concatenation
        log.warn(GCSConfigParser.potentiallyMissingCredentialMsg()); // nowarn
      }
      storage = storageOptionsBuilder.build().getService();
    } catch (IOException e) {
      throw new IllegalStateException(e);
    }
    return storage;
  }

  @Override
  public void init(NamedList<?> args) {
    super.init(args);
    final GCSConfigParser configReader = new GCSConfigParser();
    final GCSConfigParser.GCSConfig parsedConfig = configReader.parseConfiguration(config);

    this.bucketName = parsedConfig.getBucketName();
    this.credentialPath = parsedConfig.getCredentialPath();
    this.writeBufferSizeBytes = parsedConfig.getWriteBufferSize();
    this.readBufferSizeBytes = parsedConfig.getReadBufferSize();
    this.storageOptionsBuilder = parsedConfig.getStorageOptionsBuilder();

    initStorage();
  }

  @Override
  @SuppressWarnings("unchecked")
  public <T> T getConfigProperty(String name) {
    return (T) this.config.get(name);
  }

  @Override
  public URI createURI(String location) {
    Objects.requireNonNull(location);

    URI result;
    try {
      result = new URI(location);
    } catch (URISyntaxException e) {
      throw new IllegalArgumentException("Error on creating URI", e);
    }

    return result;
  }

  @Override
  public URI createDirectoryURI(String location) {
    Objects.requireNonNull(location);

    if (!location.endsWith("/")) {
      location += "/";
    }

    return createURI(location);
  }

  @Override
  public URI resolve(URI baseUri, String... pathComponents) {
    StringBuilder builder = new StringBuilder(baseUri.toString());
    for (String path : pathComponents) {
      if (path != null && !path.isEmpty()) {
        if (builder.charAt(builder.length() - 1) != '/') {
          builder.append('/');
        }
        builder.append(path);
      }
    }

    return URI.create(builder.toString()).normalize();
  }

  @Override
  public URI resolveDirectory(URI baseUri, String... pathComponents) {
    if (pathComponents.length > 0) {
      if (!pathComponents[pathComponents.length - 1].endsWith("/")) {
        pathComponents[pathComponents.length - 1] = pathComponents[pathComponents.length - 1] + "/";
      }
    } else {
      if (!baseUri.getPath().endsWith("/")) {
        baseUri = URI.create(baseUri + "/");
      }
    }
    return resolve(baseUri, pathComponents);
  }

  @Override
  public boolean exists(URI path) throws IOException {
    return exists(path.toString());
  }

  public boolean exists(String path) throws IOException {
    if (path.equals(getConfigProperty(CoreAdminParams.BACKUP_LOCATION))) {
      return true;
    }

    if (path.endsWith("/")) {
      return storage.get(bucketName, path, Storage.BlobGetOption.fields()) != null;
    } else {
      final String filePath = path;
      final String directoryPath = path + "/";
      return storage.get(bucketName, filePath, Storage.BlobGetOption.fields()) != null
          || storage.get(bucketName, directoryPath, Storage.BlobGetOption.fields()) != null;
    }
  }

  @Override
  public PathType getPathType(URI path) throws IOException {
    if (path.toString().endsWith("/")) return PathType.DIRECTORY;

    Blob blob = storage.get(bucketName, path.toString() + "/", Storage.BlobGetOption.fields());
    if (blob != null) return PathType.DIRECTORY;

    return PathType.FILE;
  }

  @Override
  public String[] listAll(URI path) throws IOException {
    final String blobName = appendTrailingSeparatorIfNecessary(path.toString());

    final String pathStr = blobName;
    final List<String> result = new ArrayList<>();
    storage
        .list(
            bucketName,
            Storage.BlobListOption.currentDirectory(),
            Storage.BlobListOption.prefix(pathStr),
            Storage.BlobListOption.fields())
        .iterateAll()
        .forEach(
            blob -> {
              assert blob.getName().startsWith(pathStr);
              final String suffixName = blob.getName().substring(pathStr.length());
              if (!suffixName.isEmpty()) {
                // Remove trailing '/' if present
                if (suffixName.endsWith("/")) {
                  result.add(suffixName.substring(0, suffixName.length() - 1));
                } else {
                  result.add(suffixName);
                }
              }
            });

    return result.toArray(new String[0]);
  }

  @Override
  public IndexInput openInput(URI dirPath, String fileName, IOContext ctx) throws IOException {
    return openInput(dirPath, fileName, ctx, readBufferSizeBytes);
  }

  private IndexInput openInput(URI dirPath, String fileName, IOContext ctx, int bufferSize) {
    String blobName = resolve(dirPath, fileName).toString();

    final BlobId blobId = BlobId.of(bucketName, blobName);
    final Blob blob = storage.get(blobId, Storage.BlobGetOption.fields(Storage.BlobField.SIZE));
    final ReadChannel readChannel = blob.reader();
    readChannel.setChunkSize(bufferSize);

    return new BufferedIndexInput(blobName, bufferSize) {

      @Override
      public long length() {
        return blob.getSize();
      }

      @Override
      protected void readInternal(ByteBuffer b) throws IOException {
        readChannel.read(b);
      }

      @Override
      protected void seekInternal(long pos) throws IOException {
        readChannel.seek(pos);
      }

      @Override
      public void close() throws IOException {
        readChannel.close();
      }
    };
  }

  @Override
  public OutputStream createOutput(URI path) throws IOException {
    final BlobInfo blobInfo = BlobInfo.newBuilder(bucketName, path.toString()).build();
    final WriteChannel writeChannel = storage.writer(blobInfo, getDefaultBlobWriteOptions());

    return Channels.newOutputStream(
        new WritableByteChannel() {
          @Override
          public int write(ByteBuffer src) throws IOException {
            return writeChannel.write(src);
          }

          @Override
          public boolean isOpen() {
            return writeChannel.isOpen();
          }

          @Override
          public void close() throws IOException {
            writeChannel.close();
          }
        });
  }

  @Override
  public void createDirectory(URI path) throws IOException {
    final String name = appendTrailingSeparatorIfNecessary(path.toString());
    if (!exists(name)) {
      storage.create(BlobInfo.newBuilder(bucketName, name).build());
    }
  }

  @Override
  public void deleteDirectory(URI path) throws IOException {
    List<BlobId> blobIds = allBlobsAtDir(path);
    if (!blobIds.isEmpty()) {
      storage.delete(blobIds);
    } else {
      log.debug("Path:{} doesn't have any blobs", path);
    }
  }

  protected List<BlobId> allBlobsAtDir(URI path) throws IOException {
    final String blobName = appendTrailingSeparatorIfNecessary(path.toString());

    final List<BlobId> result = new ArrayList<>();
    final String pathStr = blobName;
    storage
        .list(bucketName, Storage.BlobListOption.prefix(pathStr), Storage.BlobListOption.fields())
        .iterateAll()
        .forEach(blob -> result.add(blob.getBlobId()));

    return result;
  }

  @Override
  public void delete(URI path, Collection<String> files) {
    if (files.isEmpty()) {
      return;
    }
    final String prefix = appendTrailingSeparatorIfNecessary(path.toString());
    List<BlobId> blobDeletes =
        files.stream()
            .map(file -> BlobId.of(bucketName, prefix + file))
            .collect(Collectors.toList());
    storage.delete(blobDeletes);
  }

  @Override
  public void copyIndexFileFrom(
      Directory sourceDir, String sourceFileName, URI destDir, String destFileName)
      throws IOException {
    String blobName = destDir.toString();
    blobName = appendTrailingSeparatorIfNecessary(blobName);
    blobName += destFileName;
    final BlobInfo blobInfo = BlobInfo.newBuilder(bucketName, blobName).build();
    try (IndexInput input =
        shouldVerifyChecksum
            ? sourceDir.openChecksumInput(sourceFileName)
            : sourceDir.openInput(sourceFileName, IOContext.READONCE)) {
      if (input.length() <= CodecUtil.footerLength()) {
        throw new CorruptIndexException("file is too small:" + input.length(), input);
      }
      if (input.length() > LARGE_BLOB_THRESHOLD_BYTE_SIZE) {
        writeBlobResumable(blobInfo, input);
      } else {
        writeBlobMultipart(blobInfo, input, (int) input.length());
      }
    }
  }

  @Override
  public void copyIndexFileTo(
      URI sourceRepo, String sourceFileName, Directory dest, String destFileName)
      throws IOException {
    try {
      String blobName = sourceRepo.toString();
      blobName = appendTrailingSeparatorIfNecessary(blobName);
      blobName += sourceFileName;
      final BlobId blobId = BlobId.of(bucketName, blobName);
      try (final ReadChannel readChannel = storage.reader(blobId);
          IndexOutput output =
              dest.createOutput(destFileName, DirectoryFactory.IOCONTEXT_NO_CACHE)) {
        ByteBuffer buffer = ByteBuffer.allocate(readBufferSizeBytes);
        while (readChannel.read(buffer) > 0) {
          buffer.flip();
          byte[] arr = buffer.array();
          output.writeBytes(arr, buffer.position(), buffer.limit() - buffer.position());
          buffer.clear();
        }
      }
    } catch (Exception e) {
      log.info("Here's an exception e", e);
    }
  }

  @Override
  public void close() throws IOException {}

  private void writeBlobMultipart(BlobInfo blobInfo, IndexInput indexInput, int blobSize)
      throws IOException {
    byte[] bytes = new byte[blobSize];
    if (shouldVerifyChecksum) {
      indexInput.readBytes(bytes, 0, blobSize - CodecUtil.footerLength());
      long checksum = CodecUtil.checkFooter((ChecksumIndexInput) indexInput);
      ByteBuffer footerBuffer =
          ByteBuffer.wrap(bytes, blobSize - CodecUtil.footerLength(), CodecUtil.footerLength());
      writeFooter(checksum, footerBuffer);
    } else {
      indexInput.readBytes(bytes, 0, blobSize);
    }
    try {
      storage.create(blobInfo, bytes, Storage.BlobTargetOption.doesNotExist());
    } catch (final StorageException se) {
      if (se.getCode() == HTTP_PRECON_FAILED) {
        throw new FileAlreadyExistsException(blobInfo.getBlobId().getName(), null, se.getMessage());
      }
      throw se;
    }
  }

  private void writeBlobResumable(BlobInfo blobInfo, IndexInput indexInput) throws IOException {
    try {
      final WriteChannel writeChannel = storage.writer(blobInfo, getDefaultBlobWriteOptions());

      ByteBuffer buffer = ByteBuffer.allocate(writeBufferSizeBytes);
      writeChannel.setChunkSize(writeBufferSizeBytes);

      long remain =
          shouldVerifyChecksum
              ? indexInput.length() - CodecUtil.footerLength()
              : indexInput.length();
      while (remain > 0) {
        // reading
        int byteReads = (int) Math.min(buffer.capacity(), remain);
        indexInput.readBytes(buffer.array(), 0, byteReads);
        buffer.position(byteReads);
        buffer.flip();

        // writing
        writeChannel.write(buffer);
        buffer.clear();
        remain -= byteReads;
      }
      if (shouldVerifyChecksum) {
        long checksum = CodecUtil.checkFooter((ChecksumIndexInput) indexInput);
        ByteBuffer bytes = getFooter(checksum);
        writeChannel.write(bytes);
      }
      writeChannel.close();
    } catch (final StorageException se) {
      if (se.getCode() == HTTP_PRECON_FAILED) {
        throw new FileAlreadyExistsException(blobInfo.getBlobId().getName(), null, se.getMessage());
      }
      throw se;
    }
  }

  private ByteBuffer getFooter(long checksum) throws IOException {
    ByteBuffer buffer = ByteBuffer.allocate(CodecUtil.footerLength());
    writeFooter(checksum, buffer);
    return buffer;
  }

  private void writeFooter(long checksum, ByteBuffer buffer) throws IOException {
    IndexOutput out =
        new IndexOutput("", "") {

          @Override
          public void writeByte(byte b) throws IOException {
            buffer.put(b);
          }

          @Override
          public void writeBytes(byte[] b, int offset, int length) throws IOException {
            buffer.put(b, offset, length);
          }

          @Override
          public void close() throws IOException {}

          @Override
          public long getFilePointer() {
            return 0;
          }

          @Override
          public long getChecksum() throws IOException {
            return checksum;
          }
        };
    CodecUtil.writeFooter(out);
    buffer.flip();
  }

  protected Storage.BlobWriteOption[] getDefaultBlobWriteOptions() {
    return NO_WRITE_OPTIONS;
  }

  private String appendTrailingSeparatorIfNecessary(String blobName) {
    if (!blobName.endsWith("/")) {
      return blobName + "/";
    }
    return blobName;
  }
}
